package sync

import (
	"gdfs-service/api"
	"gdfs-service/conf"
	"gdfs-service/db"
	"gdfs-service/logcat"
	"gdfs-service/socket"
	"gdfs-service/utils"
	"gorm.io/gorm"
	"io"
	"net"
	"os"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

var isRunning = false
var wg sync.WaitGroup
var syncwg sync.WaitGroup
var intervalRunning = false
var isStop = false
var count int64
var files = make([]db.FileInfo, 0)
var hcount = 1000

func RunSyncData() {
	timer := time.NewTicker(time.Second * 60)
	go func() {
		for {
			select {
			case <-timer.C:
				//定时器
				intervalSyncFile()
			case <-socket.StopFUllSyncChan:
				//停止传输数据
				isStop = true
			case connTcp := <-socket.ConnTcpChan:
				//连接传输服务并开始传输数据
				connTransferTcp(connTcp)
			case sdb := <-socket.SyncDbChan:
				//批量处理同步数据
				batchDbInfo(sdb)
			case server := <-socket.CreateTcpServerChan:
				//创建tcp
				createTcpServer(server)
			case su := <-socket.SyncUpdateChan:
				//增量同步
				go sendSyncFile(su.FileKey)
			}
		}
	}()
}

//定时查询未同步数据进行同步
func intervalSyncFile() {
	if !intervalRunning {
		intervalRunning = true
		syncwg.Add(1)
		GetUnSyncFile()
		syncwg.Wait()
		intervalRunning = false
	}
}

func searchUnsyncFile() func(db *gorm.DB) *gorm.DB {
	return func(db *gorm.DB) *gorm.DB {
		return db.Where("sync_status = ?", "N")
	}
}

//获取未同步文件并发送
func GetUnSyncFile() {
	var total int64 = 0
	var fileinos []db.FileInfo
	var scopes []func(*gorm.DB) *gorm.DB
	db.GetDB().Where("sync_status = ?", "N").Find(&fileinos).Count(&total)
	scopes = append(scopes, searchUnsyncFile())
	if total != 0 {
		//pagesize
		pagesize := 100
		//pages
		pages := int(total) / pagesize
		logcat.Info("##pages:", pages)
		for i := 1; i <= pages; i++ {
			db.PageSyncQuery(scopes, pagesize, i, "").Find(&fileinos)
			for _, f := range fileinos {
				nc := socket.GetNodeConf()
				f.SyncStatus = "Y"
				for _, s := range nc.NoticeStores {
					r := api.PostNultipartFile(f, s.StoreIp, s.HttpPort)
					if r.Code != "0" {
						f.SyncStatus = "N"
					}
					time.Sleep(time.Millisecond * 100)
				}
				db.GetDB().Save(&f)
			}

		}
	}
	syncwg.Done()
}

//增量数据同步
func sendSyncFile(filekey string) {
	var fileinfo db.FileInfo
	db.GetDB().Where("file_key = ?", filekey).First(&fileinfo)
	if fileinfo != (db.FileInfo{}) {
		nc := socket.GetNodeConf()
		fileinfo.SyncStatus = "Y"
		for _, s := range nc.NoticeStores {
			r := api.PostNultipartFile(fileinfo, s.StoreIp, s.HttpPort)
			if r.Code != "0" {
				fileinfo.SyncStatus = "N"
			}
			time.Sleep(time.Millisecond * 100)
		}
		db.GetDB().Save(&fileinfo)
	}
}

//创建TCP服务端
func createTcpServer(server socket.TranmitServerCmd) {
	logcat.Info("###开始创建TCP")
	s := socket.NewSocket()
	syschannel := s.MakeTcpChannel(strconv.Itoa(server.DbPort), 1024*1024*6, 0, socket.ControllerHandler)
	s.BindServer(syschannel)
	s.ListenTcp()
	//发送创建TCP完成指令
	var serverResult socket.TranmitServerResultCmd
	serverResult.Code = "0"
	serverResult.Msg = "处理成功"
	serverResult.TaskId = server.TaskId
	c := socket.GetConn()
	if c.Alive {
		c.Conn.Write(socket.GenerateCreateTcpServerResult(serverResult))
	}
}

//批量处理数据库数据
func batchDbInfo(sdb socket.SyncDb) {
	logcat.Info("###批量处理数据")
	//总数小于hcout那么 hcount = total
	if sdb.Total < hcount {
		logcat.Info("###sdb.Total < hcount")
		hcount = sdb.Total
	}
	atomic.AddInt64(&count, 1)
	//开始批量处理数据
	d := sdb.Data
	var fileinfo db.FileInfo
	fileinfo.FileKey = d.FileKey
	fileinfo.FilePath = d.Content
	fileinfo.FileOriginalName = d.FileOriginalName
	fileinfo.FileName = d.FileName
	fileinfo.FileSize = d.FileSize
	fileinfo.FileSuffixName = d.FileSuffixName
	files = append(files, fileinfo)
	c := atomic.LoadInt64(&count)
	logcat.Info("###c =", c)
	if int(c) == hcount {
		logcat.Info("######int(c) == hcount")
		//批量处理数据库记录
		tx := db.GetDB().Begin()
		for _, f := range files {
			var fi db.FileInfo
			tx.Where("file_key = ?", f.FileKey).First(&fi)
			if fi == (db.FileInfo{}) {
				f.SyncStatus = "Y"
				dberr := tx.Create(&f).Error
				if dberr != nil {
					logcat.Info("###入库异常:", dberr)
					tx.Rollback()
					return
				}
			}
		}
		//提交事务
		tx.Commit()
		//开始上报进度
		percent := float32(hcount) / float32(sdb.Total) * 100
		logcat.Info("同步数据完成传输进度:", percent, "%")
		conn := socket.GetConn()
		if conn.Alive {
			p := socket.SyncPercent{
				TaskId:   sdb.TaskId,
				Percent:  int(percent),
				TargetId: "",
				StoreId:  conf.GetStoreid(),
				RecordId: sdb.Data.ID,
			}
			conn.Conn.Write(socket.GenerateSyncData(p))
		}
		if percent == 100 {
			//关闭监听
			files = files[0:0]
			socket.StopTcpListenChan <- true
		}
	}
}

//连接迁移服务器
func connTransferTcp(connTcp socket.ConnTcpServerCmd) {
	logcat.Info("###开始迁移数据-------------端口:", connTcp.DbPort, " //IP:", connTcp.IP)
	if !isRunning {
		logcat.Info("###开始迁移数据-------------isRunning = true!")
		isRunning = true
		wg.Add(1)
		//开始连接迁移服务端数据端口
		if utils.GetInternetIp() == connTcp.IP {
			connTcp.IP = utils.GetLocalIPaddress()
		}
		dbconn, dbConErr := socket.BuildTcpConn(connTcp.IP, strconv.Itoa(connTcp.DbPort))
		if dbConErr != nil {
			logcat.Info("####连接TCP服务失败原因:", dbConErr)
			c := socket.GetConn()
			if c.Alive {
				c.Conn.Write(socket.EncoderData(socket.GetResponse("7", 233, "-1", "处理失败")))
				return
			}
		}
		//开始传输数据
		startTransferDb(0, connTcp.TaskId, dbconn)
		wg.Wait()
		isRunning = false
	}
}

func offsetId(recordid int64) func(db *gorm.DB) *gorm.DB {
	return func(db *gorm.DB) *gorm.DB {
		return db.Where("id > ?", recordid)
	}
}

//开始同步数据库数据
func startTransferDb(recordid, taskid int64, dbconn *net.TCPConn) {
	logcat.Info("###开始传输数据-------------!")
	var total int64 = 0
	var fileinos []db.FileInfo
	var scopes []func(*gorm.DB) *gorm.DB
	if recordid != 0 {
		scopes = append(scopes, offsetId(recordid))
	}
	db.GetDB().Where("id > ?", recordid).Find(&fileinos).Count(&total)
	logcat.Info("###总数:", total)
	//defer dbconn.Close()
	if total != 0 {
		//pagesize
		pagesize := 1000
		//pages
		pages := int(total) / pagesize
		if pages == 0 {
			pages++
		}
		logcat.Info("##pages:", pages)
		var syncdb socket.SyncDb
		syncdb.Total = int(total)
		syncdb.StoreId = conf.GetStoreid()
		syncdb.TaskId = taskid
		for i := 1; i <= pages; i++ {
			logcat.Info("#####index =", i)
			if isStop { //停止传输
				break
			}
			db.PageSyncQuery(scopes, pagesize, i, "").Find(&fileinos)
			for _, f := range fileinos {
				var syncdata socket.SyncData
				s := strings.Split(f.FilePath, "/")
				filepath := conf.GetStorePath() + s[len(s)-2] + "/" + f.FileName
				content := s[len(s)-2]
				//查找文件数据
				file, err := os.Open(filepath)
				info, _ := file.Stat()
				if err != nil {
					logcat.Info("##打开文件异常:", err)
					continue
				}
				//判断文件大小
				filesize := int(info.Size())
				syncdata.FileKey = f.FileKey
				syncdata.FileOriginalName = f.FileOriginalName
				syncdata.FileName = f.FileName
				syncdata.FileSize = f.FileSize
				syncdata.FileSuffixName = f.FileSuffixName
				syncdata.ID = f.Id
				syncdata.Content = content
				bufsize := 1024 * 1024 * 5
				if filesize > bufsize {
					//超过5M的文件使用分割
					buf := make([]byte, bufsize)
					num := filesize / bufsize
					if filesize%bufsize != 0 {
						num = num + 1
					}
					readFileSucess := false
					for j := 0; j < num; j++ {
						//文件流读取
						n, readErr := file.Read(buf)
						if (readErr != nil && readErr != io.EOF) || n <= 0 {
							logcat.Info("###读取文件异常:", readErr)
							readFileSucess = true
							break
						}
						syncdata.FileByte = buf
						syncdata.Pack = num
						syncdata.PackNo = j
						syncdb.Data = syncdata
						//tcp传输数据
						if dbconn != nil {
							dbconn.Write(socket.GenerateTransferDbCmd(syncdb))
						}
					}
					if readFileSucess {
						continue
					}
				} else {

					//正常读取文件
					buf := make([]byte, filesize)
					n, readErr := file.Read(buf)
					if (readErr != nil && readErr != io.EOF) || n <= 0 {
						logcat.Info("###读取文件异常:", readErr)
						continue
					}
					syncdata.FileByte = buf
					syncdata.Pack = 1
					syncdata.PackNo = 1
					syncdb.Data = syncdata
					//tcp传输数据
					if dbconn != nil {
						dbconn.Write(socket.GenerateTransferDbCmd(syncdb))
					}

				}

			}
		}
		<-socket.StopConnTcpChan
		logcat.Info("###关闭TCP连接-------------完成传输--!")
		dbconn.Close()
		wg.Done()

	}

}
